package com.common.util;

import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeoutException;

/**
 * Created by Administrator on 2017/1/6.
 */
public class RabbitMqUtil {

    private final static String QUEUE_NAME = "hello world";

    @Test
    public void producer() {
        Connection connection = null;
        Channel channel = null;
        try {
            Integer count = 0;
            /* 创建连接工厂 */
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");//此处IP隐去
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setPort(5672);
            /* 创建连接 */
            connection = factory.newConnection();
            /* 创建信道 */
            channel = connection.createChannel();
            // 声明一个队列：名称、持久性的（重启仍存在此队列）、非私有的、非自动删除的
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            String message = "hello world..."; // 需发送的信息
            while (true) {
                count++;
                /* 发送消息，使用默认的direct交换器 */
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println("Send message -> " + message + "数量:" + count);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                 /* 关闭连接、通道 */
                if (channel != null) {
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception ex) {
                System.out.println("关闭异常,应该是已经关闭了");
            }
            System.out.println("Closed the channel and conn.");
        }
    }

    @Test
    public void customer() throws IOException, TimeoutException, InterruptedException {
        /* 创建连接工厂 */
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setPort(5672);
           /* 创建连接 */
        Connection connection = factory.newConnection();
        /* 创建信道 */
        Channel channel = connection.createChannel();
        // 声明一个队列：名称、持久性的（重启仍存在此队列）、非私有的、非自动删除的
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        for (int i=0;i<20;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {

                    while (true) {
                 /* 定义消费者 */
                        Consumer consumer = new DefaultConsumer(channel) {
                            @Override
                            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
                                String message = new String(body, "UTF-8");
                                count++;
                                System.out.println("消费总条数:" + count);
                            }
                        };
                        // 将消费者绑定到队列，并设置自动确认消息（即无需显示确认，如何设置请慎重考虑）
                        try {
                            channel.basicConsume(QUEUE_NAME, true, consumer);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }


                }
            }).start();
        }
        while(true){
            Thread.sleep(10*1000);
        }
    }


    Integer count = 0;


}


